1 /*
2  * Collie - An asynchronous event-driven network framework using Dlang development
3  *
4  * Copyright (C) 2015-2017  Shanghai Putao Technology Co., Ltd 
5  *
6  * Developer: putao's Dlang team
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module collie.bootstrap.server;
12 
13 import kiss.exception;
14 import kiss.logger;
15 import kiss.util.timer;
16 
17 import collie.net;
18 import collie.channel;
19 import collie.bootstrap.serversslconfig;
20 import collie.bootstrap.exception;
21 
22 import std.exception;
23 
24 final class ServerBootstrap(PipeLine)
25 {
26     this()
27     {
28         _loop = new EventLoop();
29     }
30 
31     this(EventLoop loop)
32     {
33         _loop = loop;
34     }
35 
36     auto pipeline(shared AcceptPipelineFactory factory)
37     {
38         _acceptorPipelineFactory = factory;
39         return this;
40     }
41 
42     auto setSSLConfig(ServerSSLConfig config)
43     {
44         _sslConfig = config;
45         return this;
46     }
47 
48     auto childPipeline(shared PipelineFactory!PipeLine factory)
49     {
50         _childPipelineFactory = factory;
51         return this;
52     }
53 
54     auto group(EventLoopGroup group)
55     {
56         _group = group;
57         return this;
58     }
59 
60     auto setReusePort(bool ruse)
61     {
62         _rusePort = ruse;
63         return this;
64     }
65 
66     /**
67             The Value will be 0 or 5s ~ 1800s.
68             0 is disable, 
69             if(value < 5) value = 5;
70             if(value > 3000) value = 1800;
71         */
72     auto heartbeatTimeOut(uint second)
73     {
74         _timeOut = second;
75         _timeOut = _timeOut < 5 ? 5 : _timeOut;
76         _timeOut = _timeOut > 1800 ? 1800 : _timeOut;
77 
78         return this;
79     }
80 
81     void bind(Address addr)
82     {
83         _address = addr;
84     }
85 
86     void bind(ushort port)
87     {
88         _address = new InternetAddress(port);
89     }
90 
91     void bind(string ip, ushort port)
92     {
93         _address = new InternetAddress(ip, port);
94     }
95 
96     void stopListening()
97     {
98         if (!_listening)
99             return;
100         scope (exit)
101             _listening = false;
102         foreach (ref accept; _serverlist)
103         {
104             accept.stop();
105         }
106         _mainAccept.stop();
107 
108     }
109 
110     void stop()
111     {
112         if (!_isLoopWait)
113             return;
114         scope (exit)
115             _isLoopWait = false;
116         _group.stop();
117         _loop.stop();
118     }
119 
120     void join()
121     {
122         if (!_isLoopWait)
123             return;
124         if (_group)
125             _group.wait();
126     }
127 
128     void waitForStop()
129     {
130 		if(_isLoopWait)
131 			throw new ServerIsRuningException("server is runing!");
132 		if(!_listening)
133 			startListening();
134 		_isLoopWait = true;
135 		if(_group)
136 			_group.start();
137 		_loop.run();
138 	}
139 
140     void startListening()
141     {
142         if (_listening)
143             throw new ServerIsListeningException("server is listening!");
144         if (_address is null || _childPipelineFactory is null)
145             throw new ServerStartException("the address or childPipelineFactory is null!");
146 
147         _listening = true;
148         uint wheel, time;
149         bool beat = getTimeWheelConfig(wheel, time);
150         _mainAccept = creatorAcceptor(_loop);
151         _mainAccept.initialize();
152         if (beat)
153             _mainAccept.startTimingWhile(wheel, time);
154         if (_group)
155         {
156             foreach (loop; _group)
157             {
158                 auto acceptor = creatorAcceptor(loop);
159                 acceptor.initialize();
160                 _serverlist ~= acceptor;
161                 if (beat)
162                     acceptor.startTimingWhile(wheel, time);
163             }
164         }
165         logDebug("server _listening!");
166     }
167 
168     EventLoopGroup group()
169     {
170         return _group;
171     }
172 
173     @property EventLoop eventLoop()
174     {
175         return _loop;
176     }
177 
178     @property Address address()
179     {
180         return _address;
181     }
182 
183 protected:
184     auto creatorAcceptor(EventLoop loop)
185     {
186         auto acceptor = new TcpListener(loop, _address.addressFamily);
187         if (_rusePort)
188             acceptor.reusePort = _rusePort;
189         acceptor.bind(_address);
190         acceptor.listen(1024);
191         {
192             Linger optLinger;
193             optLinger.on = 1;
194             optLinger.time = 0;
195             acceptor.setOption(SocketOptionLevel.SOCKET, SocketOption.LINGER, optLinger);
196         }
197         AcceptPipeline pipe;
198         if (_acceptorPipelineFactory)
199             pipe = _acceptorPipelineFactory.newPipeline(acceptor);
200         else
201             pipe = AcceptPipeline.create();
202 
203         SSL_CTX* ctx = null;
204         version (USE_SSL)
205         {
206             if (_sslConfig)
207             {
208                 ctx = _sslConfig.generateSSLCtx();
209                 if (!ctx)
210                     throw new SSLException("Can not gengrate SSL_CTX");
211             }
212         }
213 
214         return new ServerAcceptor!(PipeLine)(acceptor, pipe, _childPipelineFactory, ctx);
215     }
216 
217     bool getTimeWheelConfig(out uint whileSize, out uint time)
218     {
219         if (_timeOut == 0)
220             return false;
221         if (_timeOut <= 40)
222         {
223             whileSize = 50;
224             time = _timeOut * 1000 / 50;
225         }
226         else if (_timeOut <= 120)
227         {
228             whileSize = 60;
229             time = _timeOut * 1000 / 60;
230         }
231         else if (_timeOut <= 600)
232         {
233             whileSize = 100;
234             time = _timeOut * 1000 / 100;
235         }
236         else if (_timeOut < 1000)
237         {
238             whileSize = 150;
239             time = _timeOut * 1000 / 150;
240         }
241         else
242         {
243             whileSize = 180;
244             time = _timeOut * 1000 / 180;
245         }
246         return true;
247     }
248 
249 private:
250     shared AcceptPipelineFactory _acceptorPipelineFactory;
251     shared PipelineFactory!PipeLine _childPipelineFactory;
252 
253     ServerAcceptor!(PipeLine) _mainAccept;
254     EventLoop _loop;
255 
256     ServerAcceptor!(PipeLine)[] _serverlist;
257     EventLoopGroup _group;
258 
259     bool _listening = false;
260     bool _rusePort = true;
261     bool _isLoopWait = false;
262     uint _timeOut = 0;
263     Address _address;
264 
265     ServerSSLConfig _sslConfig = null;
266 }
267 
268 private:
269 
270 import std.functional;
271 import kiss.event.timer.common;
272 import collie.utils.memory;
273 import collie.net;
274 
275 final class ServerAcceptor(PipeLine) : InboundHandler!(Socket)
276 {
277     this(TcpListener acceptor, AcceptPipeline pipe,
278             shared PipelineFactory!PipeLine clientPipeFactory, SSL_CTX* ctx = null)
279     {
280         _acceptor = acceptor;
281         _pipeFactory = clientPipeFactory;
282         pipe.addBack(this);
283         pipe.finalize();
284         _pipe = pipe;
285         _pipe.transport(_acceptor);
286         _acceptor.onConnectionAccepted(&acceptCallBack);
287         _sslctx = ctx;
288         _list = new ServerConnection!PipeLine();
289         version (USE_SSL)
290             _sharkList = new SSLHandShark();
291     }
292 
293     void initialize()
294     {
295         _pipe.transportActive();
296     }
297 
298     void stop()
299     {
300         _pipe.transportInactive();
301     }
302 
303     override void read(Context ctx, Socket msg)
304     {
305         version (USE_SSL)
306         {
307             if (_sslctx)
308             {
309                 auto ssl = SSL_new(_sslctx);
310                 static if (IOMode == IO_MODE.iocp)
311                 {
312                     BIO* readBIO = BIO_new(BIO_s_mem());
313                     BIO* writeBIO = BIO_new(BIO_s_mem());
314                     SSL_set_bio(ssl, readBIO, writeBIO);
315                     SSL_set_accept_state(ssl);
316                     auto asynssl = new SSLSocket(_acceptor.eventLoop, msg, ssl, readBIO, writeBIO);
317                 }
318                 else
319                 {
320                     if (SSL_set_fd(ssl, msg.handle()) < 0)
321                     {
322                         error("SSL_set_fd error: fd = ", msg.handle());
323                         SSL_shutdown(ssl);
324                         SSL_free(ssl);
325                         return;
326                     }
327                     SSL_set_accept_state(ssl);
328                     auto asynssl = new SSLSocket(_acceptor.eventLoop, msg, ssl);
329                 }
330                 auto shark = new SSLHandShark(asynssl, &doHandShark);
331 
332                 shark.next = _sharkList.next;
333                 if (shark.next)
334                     shark.next.prev = shark;
335                 shark.prev = _sharkList;
336                 _sharkList.next = shark;
337 
338                 asynssl.start();
339             }
340             else
341             {
342                 auto asyntcp = new TcpStream(_acceptor.eventLoop, msg);
343                 startSocket(asyntcp);
344             }
345         }
346         else
347         {
348             auto asyntcp = new TcpStream(_acceptor.eventLoop, msg);
349             startSocket(asyntcp);
350         }
351     }
352 
353     override void transportActive(Context ctx)
354     {
355         logDebug("acept transportActive");
356         try
357         {
358             _acceptor.start();
359         }
360         catch (Exception)
361         {
362             logError("acceptor start error!");
363         }
364     }
365 
366     override void transportInactive(Context ctx)
367     {
368         _acceptor.close();
369         auto con = _list.next;
370         _list.next = null;
371         while (con)
372         {
373             auto tcon = con;
374             con = con.next;
375             tcon.close();
376         }
377         _acceptor.eventLoop.stop();
378     }
379 
380 protected:
381     pragma(inline) void remove(ServerConnection!PipeLine conn)
382     {
383         conn.prev.next = conn.next;
384         if (conn.next)
385             conn.next.prev = conn.prev;
386         gcFree(conn);
387     }
388 
389     void acceptCallBack(TcpListener sender, TcpStream stream)
390     {
391         catchAndLogException(_pipe.read(stream));
392     }
393 
394     @property acceptor()
395     {
396         return _acceptor;
397     }
398 
399     void startTimingWhile(uint whileSize, uint time)
400     {
401         if (_timer)
402             return;
403         _timer = new KissTimer(_acceptor.eventLoop, time);
404         _timer.onTick(&doWheel);
405         _wheel = new TimingWheel(whileSize);
406         _timer.start();
407     }
408 
409     void doWheel(Object)
410     {
411         if (_wheel)
412             _wheel.prevWheel();
413     }
414 
415     version (USE_SSL)
416     {
417         void doHandShark(SSLHandShark shark, SSLSocket sock)
418         {
419             shark.prev.next = shark.next;
420             if (shark.next)
421                 shark.next.prev = shark.prev;
422             scope (exit)
423                 shark.destroy();
424             if (sock)
425             {
426                 sock.setHandshakeCallBack(null);
427                 startSocket(sock);
428             }
429         }
430     }
431 
432     void startSocket(TcpStream sock)
433     {
434         auto pipe = _pipeFactory.newPipeline(sock);
435         if (!pipe)
436         {
437             gcFree(sock);
438             return;
439         }
440         pipe.finalize();
441         auto con = new ServerConnection!PipeLine(pipe);
442         con.serverAceptor = this;
443 
444         con.next = _list.next;
445         if (con.next)
446             con.next.prev = con;
447         con.prev = _list;
448         _list.next = con;
449 
450         con.initialize();
451         if (_wheel)
452             _wheel.addNewTimer(con);
453     }
454 
455 private:
456     // int[ServerConnection!PipeLine] _list;
457     ServerConnection!PipeLine _list;
458 
459     version (USE_SSL)
460     {
461         SSLHandShark _sharkList;
462     }
463 
464     TcpListener _acceptor;
465     KissTimer _timer;
466     TimingWheel _wheel;
467     AcceptPipeline _pipe;
468     shared PipelineFactory!PipeLine _pipeFactory;
469 
470     SSL_CTX* _sslctx = null;
471 }
472 
473 @trusted final class ServerConnection(PipeLine) : WheelTimer, PipelineManager
474 {
475     this(PipeLine pipe)
476     {
477         _pipe = pipe;
478         _pipe.pipelineManager = this;
479     }
480 
481     ~this()
482     {
483     }
484 
485     void initialize()
486     {
487         _pipe.transportActive();
488     }
489 
490     void close()
491     {
492         _pipe.transportInactive();
493     }
494 
495     @property serverAceptor()
496     {
497         return _manger;
498     }
499 
500     @property serverAceptor(ServerAcceptor!PipeLine manger)
501     {
502         _manger = manger;
503     }
504 
505     override void deletePipeline(PipelineBase pipeline)
506     {
507         pipeline.pipelineManager = null;
508         _pipe = null;
509         stop();
510         _manger.remove(this);
511     }
512 
513     override void refreshTimeout()
514     {
515         rest();
516     }
517 
518     override void onTimeOut() nothrow
519     {
520         collectException(_pipe.timeOut());
521     }
522 
523 private:
524     this()
525     {
526     }
527 
528     ServerConnection!PipeLine prev;
529     ServerConnection!PipeLine next;
530     ServerAcceptor!PipeLine _manger;
531     PipeLine _pipe;
532 }
533 
534 version (USE_SSL)
535 {
536     final class SSLHandShark
537     {
538         alias SSLHandSharkCallBack = void delegate(SSLHandShark shark, SSLSocket sock);
539         this(SSLSocket sock, SSLHandSharkCallBack cback)
540         {
541             _socket = sock;
542             _cback = cback;
543             _socket.setCloseCallBack(&onClose);
544             _socket.setReadCallBack(&readCallBack);
545             _socket.setHandshakeCallBack(&handSharkCallBack);
546         }
547 
548     protected:
549         void handSharkCallBack()
550         {
551             logDebug("the ssl handshark over");
552             _cback(this, _socket);
553             _socket = null;
554         }
555 
556         void readCallBack(ubyte[] buffer)
557         {
558         }
559 
560         void onClose()
561         {
562             logDebug("the ssl handshark fail");
563             _socket.setCloseCallBack(null);
564             _socket.setReadCallBack(null);
565             _socket.setHandshakeCallBack(null);
566             _socket = null;
567             _cback(this, _socket);
568         }
569 
570     private:
571         this()
572         {
573         }
574 
575         SSLHandShark prev;
576         SSLHandShark next;
577         SSLSocket _socket;
578         SSLHandSharkCallBack _cback;
579     }
580 }